package com.wx.learn.kafka.consumer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

/**
 * @author jxlgzzw
 * @date 2020-06-06 15:36
 * @description 消费者
 */
public class ConsumerTest {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        prop.setProperty("bootstrap.servers", "ud2:9092,ud3:9092,ud4:9092");
        prop.setProperty("group.id", "group1");

        //客户端的Consumer,只具有连接kafka集群和获取消息的功能,并无将消息写出的功能
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        //订阅主题
        consumer.subscribe(Collections.singleton("test1"));

        //获取Record,将Record写出到HDFS集群中
        //先做缓存操作,然后一次性全部写出去
        List<ConsumerRecord<String,String>> recordList = new ArrayList<>();
        int count = 0;

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

            for (ConsumerRecord<String, String> record : records) {
//                System.out.println("消费者获取到的消息->" + record);
                recordList.add(record);
                System.out.println("集合的大小:"+recordList.size());
                if(count++==10){
                    System.out.println("执行if判断....");
                    try {
                        //写入到HDFS
                        Configuration conf = new Configuration();
                        conf.set("fs.defaultFS","ud2:9000");
                        FileSystem fs = FileSystem.get(conf);
                        Path path = new Path("/kafka");
                        if(!fs.exists(path)) fs.mkdirs(path);
                        FSDataOutputStream fsdos = fs.create(new Path(path, new Path("" + System.currentTimeMillis())));
                        PrintWriter pw = new PrintWriter(new OutputStreamWriter(fsdos), true);
                        recordList.forEach(r-> pw.println(r.value()));  //value为消息
                        pw.flush();
                        pw.close();
                        recordList.clear();
                        count=0;
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

}
